package com.ld.shieldsb.canalclient.handler.impl.solr;

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;

import com.ld.shieldsb.canalclient.handler.BaseSyncService;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig.DbMapping;
import com.ld.shieldsb.canalclient.model.SingleDml;
import com.ld.shieldsb.canalclient.model.SyncResult;
import com.ld.shieldsb.canalclient.util.CanalUtil;
import com.ld.shieldsb.canalclient.util.SyncUtil;
import com.ld.shieldsb.common.composition.util.ConvertUtil.MapUtil;
import com.ld.shieldsb.common.core.util.solr.SolrUtil;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Data
@EqualsAndHashCode(callSuper = true)
public class SolrSyncService extends BaseSyncService {
    private Map<String, SolrClient> solrClientMap;
    // 源库表字段类型缓存: instance.schema.table -> <columnName, jdbcType>
    private Map<String, Map<String, String>> columnsTypeCache;

    public SolrSyncService(Map<String, SolrClient> solrClientMap) {
        this(solrClientMap, null, true);
    }

    public SolrSyncService(Map<String, SolrClient> solrClientMap, Integer threads, boolean skipDupException) {
        super(threads, skipDupException);
        this.columnsTypeCache = new ConcurrentHashMap<>();
        this.solrClientMap = solrClientMap;
    }

    private void putPk2values(MappingConfig config, SingleDml dml, Map<String, Object> values) {
        // 拼接主键
        for (Map.Entry<String, String> entry : config.getDbMapping().getTargetPk().entrySet()) {
            String targetColumnName = entry.getKey();
            String srcColumnName = entry.getValue();
            if (srcColumnName == null) {
                srcColumnName = CanalUtil.cleanColumn(targetColumnName);
            }
            // 如果有修改主键的情况
            if (dml.getOld() != null && dml.getOld().containsKey(srcColumnName)) {
                values.put(targetColumnName, dml.getOld().get(srcColumnName));
            } else {
                values.put(targetColumnName, dml.getData().get(srcColumnName));
            }
        }
    }

    @Override
    public SyncResult insert(int index, MappingConfig config, SingleDml dml) throws Exception {
        SyncResult result = new SyncResult();

        Map<String, Object> data = dml.getData();
        if (data == null || data.isEmpty()) { // 无数据直接返回
            result.setState(SyncResult.HAND_STATE_IGNORE);
            result.setMsg("数据为空！");
            return result;
        }
        SolrClient solrClient = solrClientMap.get(dml.getTable());

        DbMapping dbMapping = config.getDbMapping();

        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
        // 获取目标列的类型
        Map<String, String> ctype = getTargetColumnType(solrClient, config);

        Map<String, Object> values = new LinkedHashMap<>();
        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
            String targetColumnName = entry.getKey();
            String srcColumnName = entry.getValue();
            if (srcColumnName == null) {
                srcColumnName = CanalUtil.cleanColumn(targetColumnName);
            }
            String ctypeKey = SyncUtil.getIgnoreCaseColunm(ctype, targetColumnName);
            String type = ctype.get(ctypeKey);
            if (type == null) { // 目标列不存在
                log.warn(getNotMatchColumnMsg(config, targetColumnName));
            } else {
                Object value = data.get(srcColumnName);
                values.put(ctypeKey, value);
            }
        }

        SolrUtil.add(solrClient, values); // update的时候不能用此方法，否则values中没有的字段会被覆盖为空
        return result;
    }

    /**
     * 更新
     * 
     * @Title update
     * @author 吕凯
     * @date 2021年12月6日 下午5:46:57
     * @param solrClient
     * @param config
     * @param dml
     * @throws SQLException
     *             void
     */
    @Override
    public SyncResult update(int index, MappingConfig config, SingleDml dml) throws Exception {
        SyncResult result = new SyncResult();

        Map<String, Object> data = dml.getData(); // 修改后的全部数据
        Map<String, Object> old = dml.getOld(); // 旧数据
        if (MapUtil.isEmpty(data) || MapUtil.isEmpty(old)) { // 无数据直接返回
            result.setState(SyncResult.HAND_STATE_IGNORE);
            result.setMsg("数据为空！");
            return result;
        }
        SolrClient solrClient = solrClientMap.get(dml.getTable());

        DbMapping dbMapping = config.getDbMapping();

        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
        Map<String, String> ctype = getTargetColumnType(solrClient, config);

        Map<String, Object> values = new HashMap<>(); // 值的map
        boolean hasMatched = false;

        for (String srcColumnName : old.keySet()) { // 对更新的字段进行过滤
            List<String> targetColumnNames = new ArrayList<>(); // 目标库字段
            columnsMap.forEach((targetColumn, srcColumn) -> { // 过滤源表字段并转换为目标库字段
                if (srcColumnName.equalsIgnoreCase(srcColumn)) {
                    targetColumnNames.add(targetColumn);
                }
            });
            if (!targetColumnNames.isEmpty()) {
                for (String targetColumnName : targetColumnNames) {
                    String ctypeKey = SyncUtil.getIgnoreCaseColunm(ctype, targetColumnName);
                    String type = ctype.get(ctypeKey);
                    if (type == null) { // 目标列不存在
                        log.warn(getNotMatchColumnMsg(config, targetColumnName));
                    } else {
                        hasMatched = true;
                        values.put(ctypeKey, data.get(srcColumnName));
                    }
                }
            }
        }

        if (!hasMatched) {
            log.warn("Solr中没有任何匹配的字段可以进行更新操作！！！");
            result.setState(SyncResult.HAND_STATE_IGNORE);
            result.setMsg("没有匹配的字段！");
            return result;
        }
        String pkColumn = "id"; // 获取主键
        for (Map.Entry<String, String> entry : config.getDbMapping().getTargetPk().entrySet()) {
            String targetColumnName = entry.getKey();
            pkColumn = targetColumnName;
        }
        putPk2values(config, dml, values); // 主键放进去
        boolean flag = SolrUtil.update(solrClient, pkColumn, values); // 只更新变化字段
        if (!flag) {
            result.setState(SyncResult.HAND_STATE_ERROR);
        }
        return result;

    }

    /**
     * 获取没有匹配字段的文本显示
     * 
     * @Title getNotMatchColumnMsg
     * @author 吕凯
     * @date 2021年12月8日 下午2:21:40
     * @param config
     * @param dbMapping
     * @param targetColumnName
     * @return String
     */
    private String getNotMatchColumnMsg(MappingConfig config, String targetColumnName) {
        DbMapping dbMapping = config.getDbMapping();
        String sourceTable = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
        String targetTable = dbMapping.getTargetTable();
        return sourceTable + "目标字段: " + targetColumnName + " 在目标solr " + targetTable + " 中没有匹配的字段，忽略！";
    }

    /**
     * 获取目标solr的字段类型
     * 
     * @Title getTargetColumnType
     * @author 吕凯
     * @date 2021年12月22日 上午9:54:58
     * @param solrClient
     *            solr客户端
     * @param config
     *            映射配置
     * @return Map<String,String>
     */
    private Map<String, String> getTargetColumnType(SolrClient solrClient, MappingConfig config) {
        DbMapping dbMapping = config.getDbMapping();
        String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
        Map<String, String> columnType = columnsTypeCache.get(cacheKey);
        if (columnType == null) {
            synchronized (SolrSyncService.class) {
                columnType = columnsTypeCache.get(cacheKey);
                if (columnType == null) {
                    columnType = new LinkedHashMap<>();
                    final Map<String, String> columnTypeTmp = columnType;

                    try {
                        SolrUtil.getFieldInfo(solrClient).forEach((key, field) -> {
                            columnTypeTmp.put(key, field.getType());
                        });
                    } catch (SolrServerException | IOException e) {
                        log.error("获取字段类型出错！", e);
                    }
                }
            }
        }
        return columnType;
    }

    /**
     * 删除数据
     * 
     * @Title delete
     * @author 吕凯
     * @date 2021年12月6日 下午4:44:23
     * @param solrClient
     * @param config
     * @param dml
     * @throws SQLException
     *             void
     */
    @Override
    public SyncResult delete(int index, MappingConfig config, SingleDml dml) throws Exception {
        SyncResult result = new SyncResult();

        Map<String, Object> data = dml.getData();
        if (data == null || data.isEmpty()) { // 无数据直接返回
            result.setState(SyncResult.HAND_STATE_IGNORE);
            result.setMsg("数据为空！");
            return result;
        }
        SolrClient solrClient = solrClientMap.get(dml.getTable());

        DbMapping dbMapping = config.getDbMapping();

        String srcColumnName = "id";
        // 主键映射
        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
            String targetColumnName = entry.getKey();
            srcColumnName = entry.getValue();
            if (srcColumnName == null) {
                srcColumnName = CanalUtil.cleanColumn(targetColumnName);
            }

        }
        Object id = dml.getData().get(srcColumnName);

        // 根据ID删除solr的索引库
        boolean flag = SolrUtil.delById(solrClient, id);
        if (!flag) {
            result.setState(SyncResult.HAND_STATE_ERROR);
        }
        return result;

    }

    @Override
    public SyncResult truncate(int index, MappingConfig config) throws SQLException {
        log.warn("清空core的数据，暂未实现！");
        return SyncResult.builder().state(SyncResult.HAND_STATE_IGNORE).msg("暂未实现").build();
    }

    @Override
    public void submitSuccess(int index) throws Exception {
        log.warn("同步成功！");
    }

    @Override
    public void submitError(int index) throws Exception {
        log.warn("同步出错，回滚！");
    }

    @Override
    public void syncComplete() {
        log.warn("同步完成！");
    }

    @Override
    public void close() {
        super.closeResources();
        if (MapUtil.isNotEmpty(solrClientMap)) {
            for (SolrClient solrClient : solrClientMap.values()) {
                if (solrClient != null) {
                    try {
                        solrClient.close();
                    } catch (IOException e) {
                        log.error("", e);
                    }
                }
            }

        }

    }
}
